[AWS Step Functions]Mapステート内でエラーをキャッチしてステートマシンを失敗させる(AWS CDK)
具体的なユースケース
Step FunctionsのMapステートを利用している方で、以下2つのケースを満たしたい方向けです。
- Mapステートを利用していて、エラー時に並列処理を中断させたくない
- Mapステート内で1つでもエラーがある場合、ステートマシンを失敗させたい
前置き
Mapステート内でエラーをキャッチした上で、ステートマシンを失敗させるなんてなんでややこしいことするの?と思うかもしれませんが、まずはMapステートの動作を理解する必要があります。
Mapステートのデフォルト動作
まずエラー時にMapステートのデフォルト動作を確認してみます。サンプルとして全リージョンにMapステートを並列実行するものを利用しています。
Map内の並列処理で1つでも失敗した場合、Mapステートが失敗となりステートマシン自体も失敗となりました。
このとき他の並列処理はキャンセル扱いとなってしまうため、最後まで処理されずにキャンセルされます。今回は1つのリージョンで処理が失敗したため、処理途中の13リージョンが途中でキャンセルされています。
このようなケースでは、失敗した1リージョンに原因があるのかわかりません、今回キャンセルされたリージョンが次の実行時にエラーとなる可能性もあるため、全リージョン実行して結果を確認したくなります。
並列処理を停止させない実装
そこで、並列処理を停止させずに完了させる方法を調べると、Map内でエラーをキャッチすることでMapステート全体が失敗することを防ぐことができるようです。
上記を参考にエラーをキャッチするステップをMap内に追加してみました。CDKのコードは以下ブログを参考にしてください。
これで実行してみるとエラーキャッチのステップ自体は成功となるため、想定通りMapステートが失敗することなく並列処理が実行されます。
これでMapステートが途中で止まることなく、全リージョンの処理を成功させることができました。
しかし、この場合はエラーがMap内でキャッチされてしまうためステートマシン自体が成功扱いとなります。(Mapステートが成功となるため)
Map処理を中断させないことは達成できましたが、このままではエラーに気づけない恐れが…。
そこで解決策の1つとして、Mapステート外でエラーを判定してステートマシンを失敗させる方法を考えました。
並列処理を失敗させずにステートマシンを失敗させる
前置きが長くなりましたが、ここからが本エントリの本題です。
考えた結果、以下のような形で実装してみました。Mapステートの後にMapステートの出力編集(result)とエラー判定(choice)し、成功(success)と失敗(failed)のステップへ分岐するようにしています。
ステートマシン
CDKサンプル
実装はCDK(Typescript)で行なっています。
import { Construct } from 'constructs'; import { Stack, StackProps, Duration, aws_iam as iam, aws_lambda as lambda, aws_stepfunctions as sfn, aws_stepfunctions_tasks as tasks, } from 'aws-cdk-lib'; export class AwsCdkMultiRegionSfnStack extends Stack { constructor(scope: Construct, id: string, props?: StackProps) { super(scope, id, props); //IAMロール const lambdaRole = new iam.Role(this, "lambdaRole", { assumedBy: new iam.ServicePrincipal("lambda.amazonaws.com"), }); lambdaRole.addManagedPolicy( iam.ManagedPolicy.fromAwsManagedPolicyName("service-role/AWSLambdaBasicExecutionRole") ); lambdaRole.addManagedPolicy( iam.ManagedPolicy.fromAwsManagedPolicyName("AmazonEC2ReadOnlyAccess") ); //リージョン一覧を取得するLambda const createGetRegionLambdaFunction = new lambda.Function(this, 'createGetRegionLambdaFunction', { code: new lambda.AssetCode("lambda"), runtime: lambda.Runtime.PYTHON_3_9, functionName: "GetRegionLambdaFunction", handler: 'get_regions.handler', role: lambdaRole, timeout: Duration.seconds(180), }); //リージョンを出力するLambda const createPrintRegionLambdaFunction = new lambda.Function(this, 'createPrintRegionLambdaFunction', { code: new lambda.AssetCode("lambda"), runtime: lambda.Runtime.PYTHON_3_9, functionName: "PrintRegionLambdaFunction", handler: 'print_region.handler', role: lambdaRole, timeout: Duration.seconds(180), }); const getRegionsTask = new tasks.LambdaInvoke(this, 'getRegionsTask', { lambdaFunction: createGetRegionLambdaFunction, resultPath: '$.getRegions', }); const printRegionTask = new tasks.LambdaInvoke(this, 'printRegionTask', { lambdaFunction: createPrintRegionLambdaFunction, resultSelector: { "Region.$": "$.Payload", }, }); //Mapステート const printRegionMap = new sfn.Map(this, 'printRegionMap', { inputPath: "$.getRegions.Payload.Regions", resultPath: "$.printRegionMap" }); //MAPのイテレーターにRegion出力タスクを指定 printRegionMap.iterator(printRegionTask) //MAP内エラー取得 const printRegionTaskError = new sfn.Pass(this, 'printRegionTaskError') printRegionTask.addCatch(printRegionTaskError, { resultPath: '$.Error' }) // result(結果の文字列を結合) const result = new sfn.Pass(this, 'result', { parameters: { "jsonToString.$": "States.JsonToString($.printRegionMap)" }, resultPath: "$.result" }); // 失敗 const failed = new sfn.Fail(this, 'failed'); // 成功 const success = new sfn.Succeed(this, 'success'); // エラー判定 const check = new sfn.Choice(this, 'check'); check.when( sfn.Condition.stringMatches(sfn.JsonPath.stringAt("$.result.jsonToString"), "*Error*"), failed ) check.otherwise(success) //ステートマシン const definition = getRegionsTask.next(printRegionMap).next(result).next(check); new sfn.StateMachine(this, 'RegionMapStateMachine', { stateMachineName: "RegionMapStateMachine", definition: definition, }); } }
Lambda(Python)
Lambdaはリージョン取得と、出力用で2つを用意しました。
リージョン取得用(getRegionsTask)
リージョンを取得したあと、リストの形式だとステートマシン内で扱いにくいため、オブジェクト(辞書型)の形式にしてreturnしています。
import boto3 def handler(event, context): ec2 = boto3.client('ec2') regions = list(map(lambda x: x['RegionName'], ec2.describe_regions()['Regions'])) ret = [] for region in regions: ret.append( { "Region": region, } ) return {"Regions": ret}
リージョン出力用(printRegionTask)
こちらはインプットとしてRegionを取得してprintだけしています。今回はMap内で1つだけエラーとしたいため、東京リージョンのみエラーになるようにしています。
def handler(event, context): region = event.get("Region") print(region) if region == "ap-northeast-1": raise return region
これらをCDKでデプロイしています。
やってみる
CDKでデプロイできたら、実際にエラーがうまく判定できるか試してみます。インプットは不要なので、そのまま実行してみるとするとステートマシンは想定通り失敗となりました。
Mapステートは成功していますが、後続のchoiceで failed に分岐しています。これは、Mapステートで並列実行された中でエラーとなったことを判定されたためです。複数リージョン失敗した場合でも、同じようにfailedに遷移します。
ここではどのようにエラー判定が行われているか分かりにくいので、実際の入出力を追ってみます。(全て記載すると、不要な情報も多いため解説に不要な部分は省略しています。)
入出力の確認
Mapステートの出力は各リージョンごとの結果が含まれるため、配列の形で出力されます。以下の場合、東京リージョンのみがエラーとなっているケースです。
Mapステート出力
"printRegionMap": [ { "Region": "eu-north-1" }, { "Region": "ap-northeast-1", "Error": { "Error": "RuntimeError", "Cause": "{\"errorMessage\":\"No active exception to reraise\",\"errorType\":\"RuntimeError\",\"requestId\":\"369e40bb-deb8-429d-af0d-058307aed690\",\"stackTrace\":[\" File \\\"/var/task/print_region.py\\\", line 6, in handler\\n raise\\n\"]}" } }, { "Region": "sa-east-1" } ~~略~~ ]
このままではエラーの判定ができないため、Mapステートの入力を次の result ステップで文字列を連結します。
result出力
"result": { "jsonToString": "[{\"Region\":\"eu-north-1\"},{\"Region\":\"ap-northeast-1\",\"Error\":{\"Error\":\"RuntimeError\",\"Cause\":\"{\\\"errorMessage\\\":\\\"No active exception to reraise\\\",\\\"errorType\\\":\\\"RuntimeError\\\",\\\"requestId\\\":\\\"369e40bb-deb8-429d-af0d-058307aed690\\\",\\\"stackTrace\\\":[\\\" File \\\\\\\"/var/task/print_region.py\\\\\\\", line 6, in handler\\\\n raise\\\\n\\\"]}\"}},{\"Region\":\"sa-east-1\"}]" }
CDK実装では以下の部分です。Mapステートの出力($.printRegionMap)を結合した結果をパラメータjsonToStringとして設定。出力先を$.resultとしています。
// result(結果の文字列を結合) const result = new sfn.Pass(this, 'result', { parameters: { "jsonToString.$": "States.JsonToString($.printRegionMap)" }, resultPath: "$.result" });
これでMapの出力が文字列として扱えるようになりました。
次に Choice のステップではresultの結果にError
の文字列が含まれているかを判定。含まれている場合はfailedへ遷移するようにしています。
CDKでは以下の部分です。
// エラー判定 const check = new sfn.Choice(this, 'check'); check.when( sfn.Condition.stringMatches(sfn.JsonPath.stringAt("$.result.jsonToString"), "*Error*"), failed ) check.otherwise(success)
一応成功するパターンも試しましたが、問題なくsuccessに遷移しました。
おわりに
これで無事にMapステートを失敗させずに、ステートマシンを失敗させることができました。これに合わせてステートマシンの失敗を通知する仕組みがあればエラーに気づくことができそうですね。
一応Mapステート内でエラー時に通知するステップを追加する方法も考えたのですが、全ての並列実行がエラーとなった通知量がすごいことになりそうなのでやめました。
ステートマシンの実行完了を通知する仕組みは、以下を参考にすると幸せになれるかもしれません。
AWS Step Functionsの実行完了をAWS Chatbotを使っていい感じにSlackに通知する | DevelopersIO
Step FunctionsでMapを実装している方の助けになれば幸いです。